---
title: "Cache"
description: ""
icon: "database"
---

## Overview

Caching is a technique used to temporarily store copies of data or computation results to improve performance by reducing the need to repeatedly fetch or compute the same data from slower or more resource-intensive sources.

In the context of AI applications, caching provides several important benefits:

- 🚀 **Performance improvement** - Avoid repeating expensive operations like API calls or complex calculations
- 💰 **Cost reduction** - Minimize repeated calls to paid services (like external APIs or LLM providers)
- ⚡ **Latency reduction** - Deliver faster responses to users by serving cached results
- 🔄 **Consistency** - Ensure consistent responses for identical inputs

BeeAI framework provides a robust caching system with multiple implementations to suit different use cases.

---

## Core concepts

### Cache types

BeeAI framework offers several cache implementations out of the box:

| Type                   | Description                                                          |
| :--------------------- | :------------------------------------------------------------------- |
| **UnconstrainedCache** | Simple in-memory cache with no limits                                |
| **SlidingCache**       | In-memory cache that maintains a maximum number of entries           |
| **FileCache**          | Persistent cache that stores data on disk                            |
| **NullCache**          | Special implementation that performs no caching (useful for testing) |

Each cache type implements the `BaseCache` interface, making them interchangeable in your code.

### Usage patterns

BeeAI framework supports several caching patterns:

| Usage pattern           | Description                                     |
| :---------------------- | :-----------------------------------------------|
| **Direct caching**      | Manually store and retrieve values              |
| **Function decoration** | Automatically cache function returns            |
| **Tool integration**    | Cache tool execution results                    |
| **LLM integration**     | Cache model responses                           |

---

## Basic usage

### Caching function output

The simplest way to use caching is to wrap a function that produces deterministic output:

<CodeGroup>

{/* <!-- embedme python/examples/cache/unconstrained_cache_function.py --> */}
```py Python [expandable]
import asyncio
import sys
import traceback

from beeai_framework.cache import UnconstrainedCache
from beeai_framework.errors import FrameworkError


async def main() -> None:
    cache: UnconstrainedCache[int] = UnconstrainedCache()

    async def fibonacci(n: int) -> int:
        cache_key = str(n)
        cached = await cache.get(cache_key)
        if cached:
            return int(cached)

        if n < 1:
            result = 0
        elif n <= 2:
            result = 1
        else:
            result = await fibonacci(n - 1) + await fibonacci(n - 2)

        await cache.set(cache_key, result)
        return result

    print(await fibonacci(10))  # 55
    print(await fibonacci(9))  # 34 (retrieved from cache)
    print(f"Cache size {await cache.size()}")  # 10


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except FrameworkError as e:
        traceback.print_exc()
        sys.exit(e.explain())

```
{/* <!-- embedme typescript/examples/cache/unconstrainedCacheFunction.ts --> */}
```ts TypeScript [expandable]
import { UnconstrainedCache } from "beeai-framework/cache/unconstrainedCache";

const cache = new UnconstrainedCache<number>();

async function fibonacci(n: number): Promise<number> {
  const cacheKey = n.toString();
  const cached = await cache.get(cacheKey);
  if (cached !== undefined) {
    return cached;
  }

  const result = n < 1 ? 0 : n <= 2 ? 1 : (await fibonacci(n - 1)) + (await fibonacci(n - 2));
  await cache.set(cacheKey, result);
  return result;
}

console.info(await fibonacci(10)); // 55
console.info(await fibonacci(9)); // 34 (retrieved from cache)
console.info(`Cache size ${await cache.size()}`); // 10

```
</CodeGroup>

### Using with tools

BeeAI framework's caching system seamlessly integrates with tools:

<CodeGroup>
{/* <!-- embedme python/examples/cache/tool_cache.py --> */}
```py Python [expandable]
import asyncio
import sys
import traceback

from beeai_framework.cache import SlidingCache
from beeai_framework.errors import FrameworkError
from beeai_framework.tools.search.wikipedia import (
    WikipediaTool,
    WikipediaToolInput,
)


async def main() -> None:
    wikipedia_client = WikipediaTool({"full_text": True, "cache": SlidingCache(size=100, ttl=5 * 60)})

    print(await wikipedia_client.cache.size())  # 0
    tool_input = WikipediaToolInput(query="United States")
    first = await wikipedia_client.run(tool_input)
    print(await wikipedia_client.cache.size())  # 1

    # new request with the EXACTLY same input will be retrieved from the cache
    tool_input = WikipediaToolInput(query="United States")
    second = await wikipedia_client.run(tool_input)
    print(first.get_text_content() == second.get_text_content())  # True
    print(await wikipedia_client.cache.size())  # 1


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except FrameworkError as e:
        traceback.print_exc()
        sys.exit(e.explain())

```

{/* <!-- embedme typescript/examples/cache/toolCache.ts --> */}
```ts TypeScript [expandable]
import { SlidingCache } from "beeai-framework/cache/slidingCache";
import { WikipediaTool } from "beeai-framework/tools/search/wikipedia";

const ddg = new WikipediaTool({
  cache: new SlidingCache({
    size: 100, // max 100 entries
    ttl: 5 * 60 * 1000, // 5 minutes lifespan
  }),
});

const response = await ddg.run({
  query: "United States",
});
// upcoming requests with the EXACTLY same input will be retrieved from the cache

```
</CodeGroup>

### Using with LLMs

You can also cache LLM responses to save on API costs:

<CodeGroup>

{/* <!-- embedme python/examples/cache/llm_cache.py --> */}
```py Python [expandable]
import asyncio
import sys
import traceback

from beeai_framework.adapters.ollama import OllamaChatModel
from beeai_framework.backend import ChatModelParameters, UserMessage
from beeai_framework.cache import SlidingCache
from beeai_framework.errors import FrameworkError


async def main() -> None:
    llm = OllamaChatModel("granite3.3")
    llm.config(parameters=ChatModelParameters(max_tokens=25), cache=SlidingCache(size=50))

    print(await llm.cache.size())  # 0
    first = await llm.run([UserMessage("Who is Amilcar Cabral?")])
    print(await llm.cache.size())  # 1

    # new request with the EXACTLY same input will be retrieved from the cache
    second = await llm.run([UserMessage("Who is Amilcar Cabral?")])
    print(first.get_text_content() == second.get_text_content())  # True
    print(await llm.cache.size())  # 1


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except FrameworkError as e:
        traceback.print_exc()
        sys.exit(e.explain())

```

{/* <!-- embedme typescript/examples/cache/llmCache.ts --> */}
```ts TypeScript [expandable]
import { SlidingCache } from "beeai-framework/cache/slidingCache";
import { OllamaChatModel } from "beeai-framework/adapters/ollama/backend/chat";
import { UserMessage } from "beeai-framework/backend/message";

const llm = new OllamaChatModel("llama3.1");
llm.config({
  cache: new SlidingCache({
    size: 50,
  }),
  parameters: {
    maxTokens: 25,
  },
});

console.info(await llm.cache.size()); // 0
const first = await llm.create({
  messages: [new UserMessage("Who was Alan Turing?")],
});
// upcoming requests with the EXACTLY same input will be retrieved from the cache
console.info(await llm.cache.size()); // 1
const second = await llm.create({
  messages: [new UserMessage("Who was Alan Turing?")],
});
console.info(first.getTextContent() === second.getTextContent()); // true
console.info(await llm.cache.size()); // 1

```
</CodeGroup>

---

## Cache types

### UnconstrainedCache

The simplest cache type with no constraints on size or entry lifetime. Good for development and smaller applications.

<CodeGroup>
{/* <!-- embedme python/examples/cache/unconstrained_cache.py --> */}
```py Python [expandable]
import asyncio
import sys
import traceback

from beeai_framework.cache import UnconstrainedCache
from beeai_framework.errors import FrameworkError


async def main() -> None:
    cache: UnconstrainedCache[int] = UnconstrainedCache()

    # Save
    await cache.set("a", 1)
    await cache.set("b", 2)

    # Read
    result = await cache.has("a")
    print(result)  # True

    # Meta
    print(cache.enabled)  # True
    print(await cache.has("a"))  # True
    print(await cache.has("b"))  # True
    print(await cache.has("c"))  # False
    print(await cache.size())  # 2

    # Delete
    await cache.delete("a")
    print(await cache.has("a"))  # False

    # Clear
    await cache.clear()
    print(await cache.size())  # 0


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except FrameworkError as e:
        traceback.print_exc()
        sys.exit(e.explain())

```

{/* <!-- embedme typescript/examples/cache/unconstrainedCache.ts --> */}
```ts TypeScript [expandable]
import { UnconstrainedCache } from "beeai-framework/cache/unconstrainedCache";

const cache = new UnconstrainedCache();

// Save
await cache.set("a", 1);
await cache.set("b", 2);

// Read
const result = await cache.get("a");
console.log(result); // 1

// Meta
console.log(cache.enabled); // true
console.log(await cache.has("a")); // true
console.log(await cache.has("b")); // true
console.log(await cache.has("c")); // false
console.log(await cache.size()); // 2

// Delete
await cache.delete("a");
console.log(await cache.has("a")); // false

// Clear
await cache.clear();
console.log(await cache.size()); // 0

```

</CodeGroup>

### SlidingCache

Maintains a maximum number of entries, removing the oldest entries when the limit is reached.

<CodeGroup>

{/* <!-- embedme python/examples/cache/sliding_cache.py --> */}
```py Python [expandable]
import asyncio
import sys
import traceback

from beeai_framework.cache import SlidingCache
from beeai_framework.errors import FrameworkError


async def main() -> None:
    cache: SlidingCache[int] = SlidingCache(
        size=3,  # (required) number of items that can be live in the cache at a single moment
        ttl=1,  # // (optional, default is Infinity) Time in seconds after the element is removed from a cache
    )

    await cache.set("a", 1)
    await cache.set("b", 2)
    await cache.set("c", 3)

    await cache.set("d", 4)  # overflow - cache internally removes the oldest entry (key "a")

    print(await cache.has("a"))  # False
    print(await cache.size())  # 3


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except FrameworkError as e:
        traceback.print_exc()
        sys.exit(e.explain())

```
{/* <!-- embedme typescript/examples/cache/slidingCache.ts --> */}
```ts TypeScript [expandable]
import { SlidingCache } from "beeai-framework/cache/slidingCache";

const cache = new SlidingCache<number>({
  size: 3, // (required) number of items that can be live in the cache at a single moment
  ttl: 1000, // (optional, default is Infinity) Time in milliseconds after the element is removed from a cache
});

await cache.set("a", 1);
await cache.set("b", 2);
await cache.set("c", 3);

await cache.set("d", 4); // overflow - cache internally removes the oldest entry (key "a")
console.log(await cache.has("a")); // false
console.log(await cache.size()); // 3

```

</CodeGroup>

### FileCache

Persists cache data to disk, allowing data to survive if application restarts.
Use it when caches must survive process restarts or you need to share state between workers. Persisted entries still respect TTL and eviction settings, so design your limits accordingly.

<CodeGroup>

{/* <!-- embedme python/examples/cache/file_cache.py --> */}
```py Python [expandable]
import asyncio
import json
import sys
import tempfile
import time
import traceback
from collections import OrderedDict
from collections.abc import Mapping
from pathlib import Path
from typing import Generic, TypeVar

from beeai_framework.cache import BaseCache
from beeai_framework.errors import FrameworkError

T = TypeVar("T")


class JsonFileCache(BaseCache[T], Generic[T]):
    """Simple file-backed cache with optional LRU eviction and TTL support."""

    def __init__(self, path: Path, *, size: int = 128, ttl: float | None = None) -> None:
        super().__init__()
        self._path = path
        self._size = size
        self._ttl = ttl
        self._items: OrderedDict[str, tuple[T, float | None]] = OrderedDict()
        self._load_from_disk()

    @property
    def source(self) -> Path:
        return self._path

    @classmethod
    async def from_mapping(
        cls,
        path: Path,
        items: Mapping[str, T],
        *,
        size: int = 128,
        ttl: float | None = None,
    ) -> "JsonFileCache[T]":
        cache = cls(path, size=size, ttl=ttl)
        for key, value in items.items():
            await cache.set(key, value)
        return cache

    async def size(self) -> int:
        await self._purge_expired()
        return len(self._items)

    async def set(self, key: str, value: T) -> None:
        await self._purge_expired()
        expires_at = time.time() + self._ttl if self._ttl is not None else None
        if key in self._items:
            self._items.pop(key)
        self._items[key] = (value, expires_at)
        await self._enforce_capacity()
        self._dump_to_disk()

    async def get(self, key: str) -> T | None:
        await self._purge_expired()
        if key not in self._items:
            return None

        value, expires_at = self._items.pop(key)
        self._items[key] = (value, expires_at)
        return value

    async def has(self, key: str) -> bool:
        await self._purge_expired()
        return key in self._items

    async def delete(self, key: str) -> bool:
        await self._purge_expired()
        if key not in self._items:
            return False

        self._items.pop(key)
        self._dump_to_disk()
        return True

    async def clear(self) -> None:
        self._items.clear()
        if self._path.exists():
            self._path.unlink()

    async def reload(self) -> None:
        self._items.clear()
        self._load_from_disk()
        await self._purge_expired()

    async def _purge_expired(self) -> None:
        now = time.time()
        expired_keys = [
            key for key, (_, expires_at) in list(self._items.items()) if expires_at is not None and expires_at <= now
        ]
        for key in expired_keys:
            self._items.pop(key, None)
        if expired_keys:
            self._dump_to_disk()

    async def _enforce_capacity(self) -> None:
        while len(self._items) > self._size:
            oldest_key, _ = self._items.popitem(last=False)

    def _load_from_disk(self) -> None:
        if not self._path.exists():
            return

        try:
            raw = json.loads(self._path.read_text())
        except json.JSONDecodeError:
            return

        now = time.time()
        for key, payload in raw.items():
            expires_at = payload.get("expires_at")
            if expires_at is not None and expires_at <= now:
                continue
            self._items[key] = (payload["value"], expires_at)

    def _dump_to_disk(self) -> None:
        self._path.parent.mkdir(parents=True, exist_ok=True)
        data = {key: {"value": value, "expires_at": expires_at} for key, (value, expires_at) in self._items.items()}
        self._path.write_text(json.dumps(data, indent=2))


async def main() -> None:
    with tempfile.TemporaryDirectory() as tmpdir:
        path = Path(tmpdir) / "bee_cache.json"
        cache: JsonFileCache[dict[str, str]] = JsonFileCache(path, size=2, ttl=1.5)

        await cache.set("profile", {"name": "Bee", "role": "assistant"})
        await cache.set("settings", {"theme": "dark"})
        print(f"Cache persisted to {cache.source}")

        await cache.set("session", {"token": "abc123"})
        print(await cache.has("profile"))  # False -> evicted when capacity exceeded

        reloaded: JsonFileCache[dict[str, str]] = JsonFileCache(path, size=2, ttl=1.5)
        print(await reloaded.get("settings"))  # {'theme': 'dark'}

        await asyncio.sleep(1.6)
        await reloaded.reload()
        print(await reloaded.get("session"))  # None -> TTL expired


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except FrameworkError as e:
        traceback.print_exc()
        sys.exit(e.explain())

```

{/* <!-- embedme typescript/examples/cache/fileCache.ts --> */}
```ts TypeScript [expandable]
import { FileCache } from "beeai-framework/cache/fileCache";
import * as os from "node:os";

const cache = new FileCache({
  fullPath: `${os.tmpdir()}/bee_file_cache_${Date.now()}.json`,
});
console.log(`Saving cache to "${cache.source}"`);
await cache.set("abc", { firstName: "John", lastName: "Doe" });

```

</CodeGroup>

#### With custom provider

Seed a file-backed cache from another provider when you want to warm the disk cache before first use or promote hot data captured in memory. The example below clones an `UnconstrainedCache` into the JSON file cache so new processes can reuse it immediately.

<CodeGroup>

{/* <!-- embedme python/examples/cache/file_cache_custom_provider.py --> */}
```py Python [expandable]
import asyncio
import sys
import tempfile
import traceback
from pathlib import Path
from typing import TypeVar

from beeai_framework.cache import UnconstrainedCache
from beeai_framework.errors import FrameworkError
from examples.cache.file_cache import JsonFileCache

T = TypeVar("T")


async def export_cache(provider: UnconstrainedCache[T]) -> dict[str, T]:
    """Clone an in-memory cache so that we can safely persist its content."""
    cloned = await provider.clone()
    # UnconstrainedCache stores entries in a simple dict, so cloning is inexpensive here.
    return getattr(cloned, "_provider", {}).copy()


async def main() -> None:
    memory_cache: UnconstrainedCache[int] = UnconstrainedCache()
    await memory_cache.set("tasks:open", 7)
    await memory_cache.set("tasks:closed", 12)

    with tempfile.TemporaryDirectory() as tmpdir:
        path = Path(tmpdir) / "bee_cache.json"

        file_cache = await JsonFileCache.from_mapping(path, await export_cache(memory_cache), size=10, ttl=10)
        print(f"Promoted cache to disk: {file_cache.source}")

        print(await file_cache.get("tasks:open"))  # 7
        await file_cache.set("tasks:stale", 1)
        print(await file_cache.size())  # 3

        reloaded: JsonFileCache[int] = JsonFileCache(path, size=10, ttl=10)
        print(await reloaded.get("tasks:closed"))  # 12


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except FrameworkError as e:
        traceback.print_exc()
        sys.exit(e.explain())

```

{/* <!-- embedme typescript/examples/cache/fileCacheCustomProvider.ts --> */}
```ts TypeScript [expandable]
import { FileCache } from "beeai-framework/cache/fileCache";
import { UnconstrainedCache } from "beeai-framework/cache/unconstrainedCache";
import os from "node:os";

const memoryCache = new UnconstrainedCache<number>();
await memoryCache.set("a", 1);

const fileCache = await FileCache.fromProvider(memoryCache, {
  fullPath: `${os.tmpdir()}/bee_file_cache.json`,
});
console.log(`Saving cache to "${fileCache.source}"`);
console.log(await fileCache.get("a")); // 1

```

</CodeGroup>

### NullCache

A special cache that implements the `BaseCache` interface but performs no caching. Useful for testing or temporarily disabling caching.

The reason for implementing is to enable [Null object pattern](https://en.wikipedia.org/wiki/Null_object_pattern).

---

## Advanced usage

### Cache decorator

Create a reusable decorator when you want to keep caching logic close to your functions without wiring cache calls manually.

<CodeGroup>

{/* <!-- embedme python/examples/cache/decorator_cache.py --> */}
```py Python [expandable]
import asyncio
import sys
import time
import traceback

from beeai_framework.cache import SlidingCache, cached
from beeai_framework.errors import FrameworkError

request_cache: SlidingCache[str] = SlidingCache(size=8, ttl=2)


class ReportGenerator:
    def __init__(self) -> None:
        self._call_counter = 0

    @cached(request_cache)
    async def generate(self, department: str) -> str:
        self._call_counter += 1
        await asyncio.sleep(0.1)
        timestamp = time.time()
        return f"{department}:{self._call_counter}@{timestamp:.0f}"


async def main() -> None:
    generator = ReportGenerator()
    first = await generator.generate("sales")
    second = await generator.generate("sales")
    print(first == second)  # True -> cached result

    await asyncio.sleep(2.1)  # TTL expired
    third = await generator.generate("sales")
    print(first == third)  # False -> cache miss, recomputed


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except FrameworkError as e:
        traceback.print_exc()
        sys.exit(e.explain())

```

{/* <!-- embedme typescript/examples/cache/decoratorCache.ts --> */}
```ts TypeScript [expandable]
import { Cache } from "beeai-framework/cache/decoratorCache";

class Generator {
  @Cache()
  get(seed: number) {
    return (Math.random() * 1000) / Math.max(seed, 1);
  }
}

const generator = new Generator();
const a = generator.get(5);
const b = generator.get(5);
console.info(a === b); // true
console.info(a === generator.get(6)); // false

```

</CodeGroup>

For more complex caching logic, you can customize the key generation:
Use custom key builders to partition cache entries per tenant or time window, and clear the cache in response to deployment events.

<CodeGroup>

{/* <!-- embedme python/examples/cache/decorator_cache_complex.py --> */}
```py Python [expandable]
import asyncio
import datetime as dt
import random
import sys
import traceback
from typing import Any

from beeai_framework.cache import BaseCache, SlidingCache, cached
from beeai_framework.errors import FrameworkError

activity_cache: SlidingCache[dict[str, Any]] = SlidingCache(size=16, ttl=5)


def session_cache_key(args: tuple[Any, ...], kwargs: dict[str, Any]) -> str:
    user_id = kwargs.get("user_id") or args[0]
    scope = kwargs.get("scope", "default")
    bucket: int | None = kwargs.get("minute_bucket")
    payload = {"user_id": user_id, "scope": scope}
    if bucket is not None:
        payload["minute_bucket"] = bucket
    return BaseCache.generate_key(payload)


class FeatureFlagService:
    def __init__(self, *, caching_enabled: bool = True) -> None:
        self._enabled = caching_enabled
        self._db_hits = 0

    @cached(activity_cache, enabled=True, key_fn=session_cache_key)
    async def load_flags(
        self, user_id: str, scope: str = "default", minute_bucket: int | None = None
    ) -> dict[str, Any]:
        self._db_hits += 1
        await asyncio.sleep(0.05)
        return {
            "user": user_id,
            "scope": scope,
            "db_hits": self._db_hits,
            "flags": {"beta_search": random.choice([True, False])},
            "refreshed_at": dt.datetime.now(dt.UTC).isoformat(timespec="seconds"),
        }


async def main() -> None:
    service = FeatureFlagService()
    bucket = int(dt.datetime.now(dt.UTC).timestamp() // 60)

    first = await service.load_flags("42", scope="admin", minute_bucket=bucket)
    second = await service.load_flags("42", scope="admin", minute_bucket=bucket)
    print(first == second)  # True -> same cache key within a minute bucket

    await activity_cache.clear()  # Manual invalidation when new feature set deployed
    refreshed = await service.load_flags("42", scope="admin", minute_bucket=bucket)
    print(refreshed["db_hits"])  # 2 -> cache miss due to clear

    # Changing scope hits a different cache entry without flushing existing data.
    other_scope = await service.load_flags("42", scope="viewer", minute_bucket=bucket)
    print(other_scope["scope"])  # viewer


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except FrameworkError as e:
        traceback.print_exc()
        sys.exit(e.explain())

```

{/* <!-- embedme typescript/examples/cache/decoratorCacheComplex.ts --> */}
```ts TypeScript [expandable]
import { Cache, SingletonCacheKeyFn } from "beeai-framework/cache/decoratorCache";

class MyService {
  @Cache({
    cacheKey: SingletonCacheKeyFn,
    ttl: 3600,
    enumerable: true,
    enabled: true,
  })
  get id() {
    return Math.floor(Math.random() * 1000);
  }

  reset() {
    Cache.getInstance(this, "id").clear();
  }
}

const service = new MyService();
const a = service.id;
console.info(a === service.id); // true
service.reset();
console.info(a === service.id); // false

```

</CodeGroup>

### CacheFn helper

For more dynamic caching needs, the `CacheFn` helper provides a functional approach:
It is well-suited for API tokens or other resources that return an expiry with each refresh—call `update_ttl` before returning the value so the cache matches the upstream lifetime.

<CodeGroup>

{/* <!-- embedme python/examples/cache/cache_fn.py --> */}
```py Python [expandable]
import asyncio
import random
import sys
import traceback

from typing_extensions import TypedDict

from beeai_framework.cache import CacheFn
from beeai_framework.errors import FrameworkError


class TokenResponse(TypedDict):
    token: str
    expires_in: float


async def main() -> None:
    async def fetch_api_token() -> str:
        response: TokenResponse = {"token": f"TOKEN-{random.randint(1000, 9999)}", "expires_in": 0.2}
        get_token.update_ttl(response["expires_in"])
        await asyncio.sleep(0.05)
        return response["token"]

    get_token = CacheFn.create(fetch_api_token, default_ttl=0.1)

    first = await get_token()
    second = await get_token()
    print(first == second)  # True -> cached value

    await asyncio.sleep(0.25)
    refreshed = await get_token()
    print(first == refreshed)  # False -> TTL elapsed, value refreshed


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except FrameworkError as e:
        traceback.print_exc()
        sys.exit(e.explain())

```

{/* <!-- embedme typescript/examples/cache/cacheFn.ts --> */}
```ts TypeScript [expandable]
import { CacheFn } from "beeai-framework/cache/decoratorCache";
import { setTimeout } from "node:timers/promises";

const getSecret = CacheFn.create(
  async () => {
    // instead of mocking response you would do a real fetch request
    const response = await Promise.resolve({ secret: Math.random(), expiresIn: 100 });
    getSecret.updateTTL(response.expiresIn);
    return response.secret;
  },
  {}, // options object
);

const token = await getSecret();
console.info(token === (await getSecret())); // true
await setTimeout(150);
console.info(token === (await getSecret())); // false

```

</CodeGroup>

---

## Creating a custom cache provider

You can create your own cache implementation by extending the `BaseCache` class:

<CodeGroup>

{/* <!-- embedme python/examples/cache/custom.py --> */}
```py Python [expandable]
from typing import TypeVar

from beeai_framework.cache import BaseCache

T = TypeVar("T")


class CustomCache(BaseCache[T]):
    async def size(self) -> int:
        raise NotImplementedError("CustomCache 'size' not yet implemented")

    async def set(self, _key: str, _value: T) -> None:
        raise NotImplementedError("CustomCache 'set' not yet implemented")

    async def get(self, key: str) -> T | None:
        raise NotImplementedError("CustomCache 'get' not yet implemented")

    async def has(self, key: str) -> bool:
        raise NotImplementedError("CustomCache 'has' not yet implemented")

    async def delete(self, key: str) -> bool:
        raise NotImplementedError("CustomCache 'delete' not yet implemented")

    async def clear(self) -> None:
        raise NotImplementedError("CustomCache 'clear' not yet implemented")

```
{/* <!-- embedme typescript/examples/cache/custom.ts --> */}
```ts TypeScript [expandable]
import { BaseCache } from "beeai-framework/cache/base";
import { NotImplementedError } from "beeai-framework/errors";

export class CustomCache<T> extends BaseCache<T> {
  size(): Promise<number> {
    throw new NotImplementedError();
  }

  set(key: string, value: T): Promise<void> {
    throw new NotImplementedError();
  }

  get(key: string): Promise<T | undefined> {
    throw new NotImplementedError();
  }

  has(key: string): Promise<boolean> {
    throw new NotImplementedError();
  }

  delete(key: string): Promise<boolean> {
    throw new NotImplementedError();
  }

  clear(): Promise<void> {
    throw new NotImplementedError();
  }

  createSnapshot() {
    throw new NotImplementedError();
  }

  loadSnapshot(snapshot: ReturnType<typeof this.createSnapshot>): void {
    throw new NotImplementedError();
  }
}

```
</CodeGroup>

---

## Examples

<CardGroup cols={2}>
  <Card title="Python" icon="python" href="https://github.com/i-am-bee/beeai-framework/tree/main/python/examples/cache">
    Explore reference cache implementations in Python
  </Card>
  <Card title="TypeScript" icon="js" href="https://github.com/i-am-bee/beeai-framework/tree/main/typescript/examples/cache">
    Explore reference cache implementations in TypeScript
  </Card>
</CardGroup>
